{
 "cells": [
  {
   "cell_type": "raw",
   "metadata": {},
   "source": [
    "---\n",
    "title: \"Model Serving with KServe and Scikit-learn - Iris Flower Classification\"\n",
    "date: 2021-01-10\n",
    "type: technical_note\n",
    "draft: false\n",
    "---"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Model Serving with KServe and Scikit-Learn - Iris Flower Classification\n",
    "---\n",
    "\n",
    "*INPUT --> [ TRANSFORMER --> ENRICHED INPUT ] --> MODEL --> PREDICTION*\n",
    "\n",
    "<font color='red'> <h3>This notebook requires KServe to be installed</h3></font>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "> **NOTE:** It is assumed that a model called *irisflowerclassifier* is already available in Hopsworks. An example of training a model for the *Iris flower classification problem* is available in `Jupyter/end_to_end_pipelines/sklearn/end_to_end_sklearn.ipynb`"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Model Serving on [Hopsworks](https://github.com/logicalclocks/hopsworks)\n",
    "\n",
    "![hops.png](../../../images/hops.png)\n",
    "\n",
    "### Hopsworks Machine Learning (HSML) library\n",
    "\n",
    "`hsml` is the library to interact with the Hopsworks Model Registry and Model Serving. The library makes it easy to export, manage and deploy models. To learn more about `hsml`, see the <a href=\"https://docs.hopsworks.ai/machine-learning-api/latest\">Hopsworks Model Management</a> docs."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Serve the Iris Flower classifier\n",
    "\n",
    "Scikit-learn models can be serve directly on KServe without a custom predictor script. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Transformer script (Optional) \n",
    "\n",
    "To serve a model with a transformer, write a python script that implements the `Transformer` class and the methods `preprocess` and `postprocess`, like this:\n",
    "\n",
    "```python\n",
    "class Transformer(object):\n",
    "    def __init__(self):\n",
    "        \"\"\"Initialization code goes here\"\"\"\n",
    "        # NOTE: The env var ARTIFACT_FILES_PATH contains the path to the model artifact files\n",
    "\n",
    "    def preprocess(self, inputs):\n",
    "        \"\"\"Transform the request inputs. The object returned by this method will be used as model input.\"\"\"\n",
    "        return inputs\n",
    "\n",
    "    def postprocess(self, outputs):\n",
    "        \"\"\"Transform the predictions computed by the model before returning a response.\"\"\"\n",
    "        return outputs\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Create a connection to Hopsworks"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Connected. Call `.close()` to terminate connection gracefully.\n"
     ]
    }
   ],
   "source": [
    "import hsml\n",
    "\n",
    "# Connect with Hopsworks\n",
    "conn = hsml.connection()\n",
    "\n",
    "# Retrieve the model registry handle\n",
    "mr = conn.get_model_registry()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Query Model Registry for best Iris Classifier Model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Model name: irisflowerclassifier\n",
      "Model version: 1\n",
      "{'accuracy': '0.98'}\n"
     ]
    }
   ],
   "source": [
    "MODEL_NAME = \"irisflowerclassifier\"\n",
    "\n",
    "# Get the best version of the model\n",
    "best_model = mr.get_best_model(MODEL_NAME, \"accuracy\", \"max\")\n",
    "\n",
    "print('Model name: ' + best_model.name)\n",
    "print('Model version: ' + str(best_model.version))\n",
    "print(best_model.training_metrics)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Create a Deployment for the Trained Model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Deploy the trained model\n",
    "irisclassifier = best_model.deploy(serving_tool=\"KSERVE\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "After the deployment has been created, you can find it in the Hopsworks UI by going to the \"Deployments\" tab. You can also use the class attributes or the `.describe()` method of a deployment object to access its metadata."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Deployment: irisflowerclassifier\n",
      "{\n",
      "    \"artifact_version\": 0,\n",
      "    \"batching_enabled\": false,\n",
      "    \"created\": \"2022-05-18T14:41:41.316Z\",\n",
      "    \"creator\": \"Admin Admin\",\n",
      "    \"id\": 8,\n",
      "    \"inference_logging\": \"ALL\",\n",
      "    \"kafka_topic_dto\": {\n",
      "        \"name\": \"CREATE\",\n",
      "        \"num_of_partitions\": 1,\n",
      "        \"num_of_replicas\": 1\n",
      "    },\n",
      "    \"model_name\": \"irisflowerclassifier\",\n",
      "    \"model_path\": \"/Projects/demo_ml_meb10000/Models/irisflowerclassifier\",\n",
      "    \"model_server\": \"PYTHON\",\n",
      "    \"model_version\": 1,\n",
      "    \"name\": \"irisflowerclassifier\",\n",
      "    \"predictor\": null,\n",
      "    \"predictor_resource_config\": {\n",
      "        \"cores\": 1,\n",
      "        \"gpus\": 0,\n",
      "        \"memory\": 1024\n",
      "    },\n",
      "    \"requested_instances\": 1,\n",
      "    \"serving_tool\": \"KSERVE\"\n",
      "}\n"
     ]
    }
   ],
   "source": [
    "print(\"Deployment: \" + irisclassifier.name)\n",
    "irisclassifier.describe()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Use a transformer to enrich model inputs (optional)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "When creating a deployment for the trained model, you can attach a transformer by setting a custom transformer script."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# from hsml.transformer import Transformer\n",
    "\n",
    "# TRANSFORMER_SCRIPT_PATH = mr.project_path + \"/Jupyter/serving/kserve/sklearn/transformer.py\" # or .ipynb\n",
    "\n",
    "# irisclassifier = best_model.deploy(serving_tool=\"KSERVE\",\n",
    "#                                    transformer=Transformer(script_file=TRANSFORMER_SCRIPT_PATH))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Classify flowers with the Iris Flower classifier"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Start Deployment"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "ac39f6a1a5944679afc995fdf1164f4e",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "HBox(children=(FloatProgress(value=0.0, max=1.0), HTML(value='')))"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "irisclassifier.start()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Send Prediction Requests to the Deployed Model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For making inference requests you can use the `.predict()` method of the deployment metadata object."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n"
     ]
    }
   ],
   "source": [
    "for i in range(20):\n",
    "    data = {\"instances\" : [best_model.input_example]}\n",
    "    predictions = irisclassifier.predict(data)\n",
    "    print(predictions)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Monitor Prediction Logs\n",
    "\n",
    "### Consume Prediction Requests and Responses using Kafka\n",
    "\n",
    "All prediction requestst are automatically logged to Kafka which means that you can keep track for your model's performance and its predictions in a scalable manner."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "from hops import kafka\n",
    "from confluent_kafka import Producer, Consumer, KafkaError"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Setup Kafka consumer and subscribe to the topic containing the prediction logs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "TOPIC_NAME = irisclassifier.inference_logger.kafka_topic.name\n",
    "\n",
    "config = kafka.get_kafka_default_config()\n",
    "config['default.topic.config'] = {'auto.offset.reset': 'earliest'}\n",
    "consumer = Consumer(config)\n",
    "topics = [TOPIC_NAME]\n",
    "consumer.subscribe(topics)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Read the Kafka Avro schema from Hopsworks and setup an Avro reader"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "json_schema = kafka.get_schema(TOPIC_NAME)\n",
    "avro_schema = kafka.convert_json_schema_to_avro(json_schema)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Read messages from the Kafka topic, parse them with the Avro schema and print the results"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "INFO -> servingId: 8, modelName: irisflowerclassifier, modelVersion: 1,requestTimestamp: 1652885019, inferenceId:2d23b009-f968-4f6b-995a-c79b4f4fa8af, messageType:response\n",
      "Predictions -> [0]\n",
      "\n",
      "INFO -> servingId: 8, modelName: irisflowerclassifier, modelVersion: 1,requestTimestamp: 1652885020, inferenceId:63712ace-ab66-48bf-89b7-e523745bac99, messageType:response\n",
      "Predictions -> [0]\n",
      "\n",
      "INFO -> servingId: 8, modelName: irisflowerclassifier, modelVersion: 1,requestTimestamp: 1652885021, inferenceId:9cd56df2-02f9-4227-ac30-fba1a2c06fdf, messageType:response\n",
      "Predictions -> [0]\n",
      "\n",
      "INFO -> servingId: 8, modelName: irisflowerclassifier, modelVersion: 1,requestTimestamp: 1652885021, inferenceId:73214f35-3792-42b8-a9e1-6a2ac4d7a5c1, messageType:response\n",
      "Predictions -> [0]\n",
      "\n",
      "INFO -> servingId: 8, modelName: irisflowerclassifier, modelVersion: 1,requestTimestamp: 1652885022, inferenceId:485152f2-ef5d-4949-8b7f-4f0460524879, messageType:response\n",
      "Predictions -> [0]\n",
      "\n"
     ]
    }
   ],
   "source": [
    "import json\n",
    "\n",
    "PRINT_INSTANCES=False\n",
    "PRINT_PREDICTIONS=True\n",
    "\n",
    "for i in range(0, 10):\n",
    "    msg = consumer.poll(timeout=5.0)\n",
    "    if msg is not None:\n",
    "        value = msg.value()\n",
    "        try:\n",
    "            event_dict = kafka.parse_avro_msg(value, avro_schema)  \n",
    "            payload = json.loads(event_dict[\"payload\"])\n",
    "            \n",
    "            if (event_dict['messageType'] == \"request\" and not PRINT_INSTANCES) or \\\n",
    "                (event_dict['messageType'] == \"response\" and not PRINT_PREDICTIONS):\n",
    "                continue\n",
    "            \n",
    "            print(\"INFO -> servingId: {}, modelName: {}, modelVersion: {},\"\\\n",
    "                  \"requestTimestamp: {}, inferenceId:{}, messageType:{}\".format(\n",
    "                       event_dict[\"servingId\"],\n",
    "                       event_dict[\"modelName\"],\n",
    "                       event_dict[\"modelVersion\"],\n",
    "                       event_dict[\"requestTimestamp\"],\n",
    "                       event_dict[\"inferenceId\"],\n",
    "                       event_dict[\"messageType\"]))\n",
    "\n",
    "            if event_dict['messageType'] == \"request\":\n",
    "                print(\"Instances -> {}\\n\".format(payload['instances']))\n",
    "                \n",
    "            if event_dict['messageType'] == \"response\":\n",
    "                print(\"Predictions -> {}\\n\".format(payload['predictions']))\n",
    "\n",
    "        except Exception as e:\n",
    "            print(\"A message was read but there was an error parsing it\")\n",
    "            print(e)\n",
    "    else:\n",
    "        print(\"timeout.. no more messages to read from topic\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
],
"metadata": {
 "kernelspec": {
  "display_name": "Python",
  "language": "python",
  "name": "python3"
 },
 "language_info": {
  "codemirror_mode": {
   "name": "ipython",
   "version": 3
  },
  "file_extension": ".py",
  "mimetype": "text/x-python",
  "name": "python",
  "nbconvert_exporter": "python",
  "pygments_lexer": "ipython3",
  "version": "3.8.11"
 }
},
"nbformat": 4,
"nbformat_minor": 4
}